{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "374411c5-a48f-4739-9031-d638638633a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "// In python use: from pyspark.sql.functions import broadcast, split, lit\n",
    "import org.apache.spark.sql.functions.{broadcast, split, lit}\n",
    "\n",
    "\n",
    "val matchesBucketed = spark.read.option(\"header\", \"true\")\n",
    "                        .option(\"inferSchema\", \"true\")\n",
    "                        .csv(\"/home/iceberg/data/matches.csv\")\n",
    "val matchDetailsBucketed =  spark.read.option(\"header\", \"true\")\n",
    "                        .option(\"inferSchema\", \"true\")\n",
    "                        .csv(\"/home/iceberg/data/match_details.csv\")\n",
    "\n",
    "\n",
    "spark.sql(\"\"\"DROP TABLE IF EXISTS bootcamp.matches_bucketed\"\"\")\n",
    "val bucketedDDL = \"\"\"\n",
    "CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (\n",
    "     match_id STRING,\n",
    "     is_team_game BOOLEAN,\n",
    "     playlist_id STRING,\n",
    "     completion_date TIMESTAMP\n",
    " )\n",
    " USING iceberg\n",
    " PARTITIONED BY (completion_date, bucket(16, match_id));\n",
    " \"\"\"\n",
    " spark.sql(bucketedDDL)\n",
    "\n",
    "// matchesBucketed.select(\n",
    "//     $\"match_id\", $\"is_team_game\", $\"playlist_id\", $\"completion_date\"\n",
    "//     )\n",
    "//     .write.mode(\"append\")\n",
    "//     .partitionBy(\"completion_date\")\n",
    "//   .bucketBy(16, \"match_id\").saveAsTable(\"bootcamp.matches_bucketed\")\n",
    "\n",
    "\n",
    "// val bucketedDetailsDDL = \"\"\"\n",
    "// CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (\n",
    "//     match_id STRING,\n",
    "//     player_gamertag STRING,\n",
    "//     player_total_kills INTEGER,\n",
    "//     player_total_deaths INTEGER\n",
    "// )\n",
    "// USING iceberg\n",
    "// PARTITIONED BY (bucket(16, match_id));\n",
    "// \"\"\"\n",
    "// spark.sql(bucketedDetailsDDL)\n",
    "\n",
    "// matchDetailsBucketed.select(\n",
    "//     $\"match_id\", $\"player_gamertag\", $\"player_total_kills\", $\"player_total_deaths\")\n",
    "//     .write.mode(\"append\")\n",
    "//   .bucketBy(16, \"match_id\").saveAsTable(\"bootcamp.match_details_bucketed\")\n",
    "\n",
    "spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\")\n",
    "\n",
    "matchesBucketed.createOrReplaceTempView(\"matches\")\n",
    "matchDetailsBucketed.createOrReplaceTempView(\"match_details\")\n",
    "\n",
    "spark.sql(\"\"\"\n",
    "    SELECT * FROM bootcamp.match_details_bucketed mdb JOIN bootcamp.matches_bucketed md \n",
    "    ON mdb.match_id = md.match_id\n",
    "    AND md.completion_date = DATE('2016-01-01')\n",
    "        \n",
    "\"\"\").explain()\n",
    "\n",
    "\n",
    "spark.sql(\"\"\"\n",
    "    SELECT * FROM match_details mdb JOIN matches md ON mdb.match_id = md.match_id\n",
    "        \n",
    "\"\"\").explain()\n",
    "\n",
    "// spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"1000000000000\")\n",
    "\n",
    "// val broadcastFromThreshold = matches.as(\"m\").join(matchDetails.as(\"md\"), $\"m.match_id\" === $\"md.match_id\")\n",
    "//   .select($\"m.completion_date\", $\"md.player_gamertag\",  $\"md.player_total_kills\")\n",
    "//   .take(5)\n",
    "\n",
    "// val explicitBroadcast = matches.as(\"m\").join(broadcast(matchDetails).as(\"md\"), $\"m.match_id\" === $\"md.match_id\")\n",
    "//   .select($\"md.*\", split($\"completion_date\", \" \").getItem(0).as(\"ds\"))\n",
    "\n",
    "// val bucketedValues = matchDetailsBucketed.as(\"mdb\").join(matchesBucketed.as(\"mb\"), $\"mb.match_id\" === $\"mdb.match_id\").explain()\n",
    "// // .take(5)\n",
    "\n",
    "// val values = matchDetailsBucketed.as(\"m\").join(matchesBucketed.as(\"md\"), $\"m.match_id\" === $\"md.match_id\").explain()\n",
    "\n",
    "// explicitBroadcast.write.mode(\"overwrite\").insertInto(\"match_details_bucketed\")\n",
    "\n",
    "// matches.withColumn(\"ds\", split($\"completion_date\", \" \").getItem(0)).write.mode(\"overwrite\").insertInto(\"matches_bucketed\")\n",
    "\n",
    "// spark.sql(bucketedSQL)\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fc8adb02-d5bd-4e84-a671-48991772d233",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6f1eecb6-ca9a-4b5c-b046-b3a0dd1ff3bf",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "spylon-kernel",
   "language": "scala",
   "name": "spylon-kernel"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "help_links": [
    {
     "text": "MetaKernel Magics",
     "url": "https://metakernel.readthedocs.io/en/latest/source/README.html"
    }
   ],
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "0.4.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
